{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Minimal mnist example on Hopsworks\n",
    "---\n",
    "\n",
    "<font color='red'> <h3>Tested with TensorFlow 1.11</h3></font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Hops Experiment paradigm <a class=\"anchor\" id='paradigm'></a>\n",
    "\n",
    "To be able to run your TensorFlow code on Hops, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function. If you wish to run gridsearch over a given set of hyperparameters, you can define arguments for this wrapper function that corresponds to the name of your hyperparameters.\n",
    "\n",
    "You can also submit one or more `.py`, `.zip` or `.egg` files that contain your code and import them in the wrapper function. To include files, navigate back to HopsWorks and restart restart Jupyter, you can then include files in the Jupyter configuration.\n",
    "\n",
    "## The `hops` python module\n",
    "\n",
    "Below you can see the aforementioned wrapper function, which is coincidently named `wrapper` but could potentially be named anything. You can see two imports from the `hops` module, a `tensorboard` and an `hdfs` module. These are the only two modules that you will need to use in your TensorFlow wrapper function. \n",
    "\n",
    "### Using the `tensorboard` module\n",
    "The `tensorboard` module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is `tensorboard.logdir()`, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project in HopsFS after each hyperparameter configuration is finished. The `experiment.launch` function, that we will look at abit further down will return the exact path, which you can then navigate to using HopsWorks to inspect the files.\n",
    "\n",
    "The directory could in practice be used to store other data that should be accessible after each hyperparameter configuration is finished.\n",
    "```python\n",
    "# Use this module to get the TensorBoard logdir\n",
    "from hops import tensorboard\n",
    "tensorboard_logdir = tensorboard.logdir()\n",
    "```\n",
    "\n",
    "\n",
    "### Using the `hdfs` module\n",
    "The `hdfs` module provides a single method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, which is created automatically for each project, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`\n",
    "```python\n",
    "# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project\n",
    "from hops import hdfs\n",
    "project_path = hdfs.project_path()\n",
    "```\n",
    "\n",
    "![image11-Dataset-ProjectPath.png](../../images/datasets.png)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def parameter_server_mnist():\n",
    "\n",
    "    import tensorflow as tf\n",
    "    from hops import devices\n",
    "    from hops import tensorboard\n",
    "    import os\n",
    "    import json\n",
    "    \n",
    "    # Extract information about cluster\n",
    "    tf_cluster = json.loads(os.environ[\"TF_CONFIG\"])\n",
    "    \n",
    "    # Workers + chief is the number of processes reading from HDFS\n",
    "    num_workers = len(tf_cluster[\"cluster\"][\"worker\"]) + 1\n",
    "    \n",
    "    # Both chief and workers needs unique task_index for sharding\n",
    "    task_index = 0\n",
    "    if tf_cluster[\"task\"][\"type\"] == 'chief':\n",
    "        task_index = num_workers - 1\n",
    "    else:\n",
    "        task_index = tf_cluster[\"task\"][\"index\"]\n",
    "    \n",
    "    PREDICT = tf.estimator.ModeKeys.PREDICT\n",
    "    EVAL = tf.estimator.ModeKeys.EVAL\n",
    "    TRAIN = tf.estimator.ModeKeys.TRAIN\n",
    "    learning_rate=0.002\n",
    "    batch_size=128\n",
    "    training_steps=5000\n",
    "    \n",
    "    train_filenames = [hdfs.project_path() + \"TourData/mnist/train/train.tfrecords\"]\n",
    "    validation_filenames = [hdfs.project_path() + \"TourData/mnist/validation/validation.tfrecords\"]\n",
    "\n",
    "    def build_estimator(config):\n",
    "        \"\"\"\n",
    "        Build the estimator based on the given config and params.\n",
    "        Args:\n",
    "            config (RunConfig): RunConfig object that defines how to run the Estimator.\n",
    "            params (object): hyper-parameters (can be argparse object).\n",
    "        \"\"\"\n",
    "        return tf.estimator.Estimator(\n",
    "            model_fn=model_fn,\n",
    "            config=config,\n",
    "        )\n",
    "\n",
    "\n",
    "    def model_fn(features, labels, mode):\n",
    "        \"\"\"Model function used in the estimator.\n",
    "        Args:\n",
    "            features (Tensor): Input features to the model.\n",
    "            labels (Tensor): Labels tensor for training and evaluation.\n",
    "            mode (ModeKeys): Specifies if training, evaluation or prediction.\n",
    "            params (object): hyper-parameters (can be argparse object).\n",
    "        Returns:\n",
    "            (EstimatorSpec): Model to be run by Estimator.\n",
    "        \"\"\"\n",
    "        \n",
    "        features = tf.cast(features, tf.float32)\n",
    "        # Define model's architecture\n",
    "        logits = architecture(features, mode)\n",
    "        class_predictions = tf.argmax(logits, axis=-1)\n",
    "        # Setup the estimator according to the phase (Train, eval, predict)\n",
    "        loss = None\n",
    "        train_op = None\n",
    "        eval_metric_ops = {}\n",
    "        predictions = class_predictions\n",
    "        # Loss will only be tracked during training or evaluation.\n",
    "        if mode in (TRAIN, EVAL):\n",
    "            loss = tf.losses.sparse_softmax_cross_entropy(\n",
    "                labels=tf.cast(labels, tf.int32),\n",
    "                logits=logits)\n",
    "        # Training operator only needed during training.\n",
    "        if mode == TRAIN:\n",
    "            train_op = get_train_op_fn(loss)\n",
    "        # Evaluation operator only needed during evaluation\n",
    "        if mode == EVAL:\n",
    "            eval_metric_ops = {\n",
    "                'accuracy': tf.metrics.accuracy(\n",
    "                    labels=labels,\n",
    "                    predictions=class_predictions,\n",
    "                    name='accuracy')\n",
    "            }\n",
    "        # Class predictions and probabilities only needed during inference.\n",
    "        if mode == PREDICT:\n",
    "            predictions = {\n",
    "                'classes': class_predictions,\n",
    "                'probabilities': tf.nn.softmax(logits, name='softmax_tensor')\n",
    "            }\n",
    "        return tf.estimator.EstimatorSpec(\n",
    "            mode=mode,\n",
    "            predictions=predictions,\n",
    "            loss=loss,\n",
    "            train_op=train_op,\n",
    "            eval_metric_ops=eval_metric_ops\n",
    "        )\n",
    "\n",
    "\n",
    "    def architecture(inputs, mode, scope='MnistConvNet'):\n",
    "        \"\"\"Return the output operation following the network architecture.\n",
    "        Args:\n",
    "            inputs (Tensor): Input Tensor\n",
    "            mode (ModeKeys): Runtime mode (train, eval, predict)\n",
    "            scope (str): Name of the scope of the architecture\n",
    "        Returns:\n",
    "             Logits output Op for the network.\n",
    "        \"\"\"\n",
    "        with tf.variable_scope(scope):\n",
    "            inputs = inputs / 255\n",
    "            input_layer = tf.reshape(inputs, [-1, 28, 28, 1])\n",
    "            conv1 = tf.layers.conv2d(\n",
    "                inputs=input_layer,\n",
    "                filters=20,\n",
    "                kernel_size=[5, 5],\n",
    "                padding='valid',\n",
    "                activation=tf.nn.relu)\n",
    "            pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)\n",
    "            conv2 = tf.layers.conv2d(\n",
    "                inputs=pool1,\n",
    "                filters=40,\n",
    "                kernel_size=[5, 5],\n",
    "                padding='valid',\n",
    "                activation=tf.nn.relu)\n",
    "            pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)\n",
    "            flatten = tf.reshape(pool2, [-1, 4 * 4 * 40])\n",
    "            dense1 = tf.layers.dense(inputs=flatten, units=256, activation=tf.nn.relu)\n",
    "            dropout = tf.layers.dropout(\n",
    "                inputs=dense1, rate=0.5, training=mode==tf.estimator.ModeKeys.TRAIN)\n",
    "            dense2 = tf.layers.dense(inputs=dropout, units=10)\n",
    "            return dense2\n",
    "\n",
    "\n",
    "    def get_train_op_fn(loss):\n",
    "        \"\"\"Get the training Op.\n",
    "        Args:\n",
    "             loss (Tensor): Scalar Tensor that represents the loss function.\n",
    "             params (object): Hyper-parameters (needs to have `learning_rate`)\n",
    "        Returns:\n",
    "            Training Op\n",
    "        \"\"\"\n",
    "        optimizer = tf.train.AdamOptimizer(learning_rate)\n",
    "        train_op = optimizer.minimize(\n",
    "            loss=loss,\n",
    "            global_step=tf.train.get_global_step())\n",
    "        return train_op\n",
    "\n",
    "    def get_train_inputs(filenames, batch_size):\n",
    "\n",
    "        def parser(serialized_example):\n",
    "            \"\"\"Parses a single tf.Example into image and label tensors.\"\"\"\n",
    "            features = tf.parse_single_example(\n",
    "                serialized_example,\n",
    "                features={\n",
    "                    'image_raw': tf.FixedLenFeature([], tf.string),\n",
    "                    'label': tf.FixedLenFeature([], tf.int64),\n",
    "                })\n",
    "            image = tf.decode_raw(features['image_raw'], tf.uint8)\n",
    "            image.set_shape([28 * 28])\n",
    "\n",
    "            # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]\n",
    "            image = tf.cast(image, tf.float32) / 255 - 0.5\n",
    "            label = tf.cast(features['label'], tf.int32)\n",
    "            return image, label\n",
    "\n",
    "        \n",
    "        # Import MNIST data\n",
    "        dataset = tf.data.TFRecordDataset(filenames)\n",
    "        \n",
    "        # Shard Dataset on each worker so it gets unique samples\n",
    "        dataset.shard(num_workers, task_index)\n",
    "\n",
    "        # Map the parser over dataset, and batch results by up to batch_size\n",
    "        dataset = dataset.map(parser)\n",
    "        dataset = dataset.batch(batch_size)\n",
    "        return dataset\n",
    "\n",
    "    def get_eval_inputs(filenames, batch_size):\n",
    "\n",
    "        def parser(serialized_example):\n",
    "            \"\"\"Parses a single tf.Example into image and label tensors.\"\"\"\n",
    "            features = tf.parse_single_example(\n",
    "                serialized_example,\n",
    "                features={\n",
    "                    'image_raw': tf.FixedLenFeature([], tf.string),\n",
    "                    'label': tf.FixedLenFeature([], tf.int64),\n",
    "                })\n",
    "            image = tf.decode_raw(features['image_raw'], tf.uint8)\n",
    "            image.set_shape([28 * 28])\n",
    "\n",
    "            # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]\n",
    "            image = tf.cast(image, tf.float32) / 255 - 0.5\n",
    "            label = tf.cast(features['label'], tf.int32)\n",
    "            return image, label\n",
    "\n",
    "        \n",
    "        # Import MNIST data\n",
    "        dataset = tf.data.TFRecordDataset(filenames)\n",
    "\n",
    "        # Map the parser over dataset, and batch results by up to batch_size\n",
    "        dataset = dataset.map(parser)\n",
    "        dataset = dataset.batch(batch_size)\n",
    "        return dataset\n",
    "    \n",
    "\n",
    "     # Read parameters and input data\n",
    "    mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()\n",
    "        \n",
    "    config = tf.estimator.RunConfig(\n",
    "        experimental_distribute=tf.contrib.distribute.DistributeConfig(\n",
    "            train_distribute=tf.contrib.distribute.ParameterServerStrategy(\n",
    "                num_gpus_per_worker=devices.get_num_gpus()),\n",
    "            eval_distribute=tf.contrib.distribute.MirroredStrategy(\n",
    "                num_gpus_per_worker=devices.get_num_gpus())),\n",
    "            model_dir=tensorboard.logdir(),\n",
    "            save_summary_steps=100,\n",
    "            log_step_count_steps=100,\n",
    "            save_checkpoints_steps=500)\n",
    "        # Setup the Estimator\n",
    "    model_estimator = build_estimator(config)\n",
    "    # Setup and start training and validation\n",
    "    train_spec = tf.estimator.TrainSpec(\n",
    "         input_fn=lambda: get_train_inputs(train_filenames, batch_size),\n",
    "         max_steps=training_steps)\n",
    "    eval_spec = tf.estimator.EvalSpec(\n",
    "         input_fn=lambda: get_eval_inputs(validation_filenames, batch_size),\n",
    "         steps=None,\n",
    "         start_delay_secs=10,  # Start evaluating after 10 sec.\n",
    "         throttle_secs=30  # Evaluate only every 30 sec\n",
    "    )\n",
    "        \n",
    "    tf.estimator.train_and_evaluate(model_estimator, train_spec, eval_spec)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import experiment\n",
    "from hops import hdfs\n",
    "\n",
    "notebook = hdfs.project_path() + \"Jupyter/Distributed_Training/parameter_server_strategy/mnist.ipynb\"\n",
    "experiment.parameter_server(parameter_server_mnist,\n",
    "                  name='mnist estimator', \n",
    "                  description='A minimal mnist example with two hidden layers',\n",
    "                  versioned_resources=[notebook], local_logdir=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Monitoring execution - TensorBoard <a class=\"anchor\" id='tensorboard'></a>\n",
    "To find the TensorBoard for the execution, please go back to HopsWorks and enter the Experiments service.\n",
    "Then copy & paste the experiment_id into the textbox and press enter to start a TensorBoard to see all experiments being run in parallel.\n",
    "\n",
    "![Image7-Monitor.png](../../images/experiments_service.png)\n",
    "![Image7-Monitor.png](../../images/tensorboard.png)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
